Skip to content

一、基本概念

Go 中的 channel 是一个队列,遵循先进先出的原则,负责协程之间的通信(Go 语言提倡不要通过共享内存来通信,而要通过通信来实现内存共享,CSP(Communicating Sequential Process)并发模型,就是通过 goroutine 和 channel 来实现的)

使用场景:

1、停止信号监听

2、定时任务

3、生产方和消费方解耦

4、控制并发数

二、底层数据结构

通过 var 声明或者 make 函数创建的 channel 变量是一个存储在函数栈帧上的指针,占用 8 个字节,指向堆上的 hchan 结构体

源码包中src/runtime/chan.go定义了 hchan 的数据结构:

一)hchan 结构体

type hchan struct {
 closed   uint32   // channel是否关闭的标志
 elemtype *_type   // channel中的元素类型

 // channel分为无缓冲和有缓冲两种。
 // 对于有缓冲的channel存储数据,使用了 ring buffer(环形缓冲区) 来缓存写入的数据,本质是循环数组
 // 为啥是循环数组?普通数组不行吗,普通数组容量固定更适合指定的空间,弹出元素时,普通数组需要全部都前移
 // 当下标超过数组容量后会回到第一个位置,所以需要有两个字段记录当前读和写的下标位置
 buf      unsafe.Pointer // 指向底层循环数组的指针(环形缓冲区)
 qcount   uint           // 循环数组中的元素数量
 dataqsiz uint           // 循环数组的长度
 elemsize uint16 				 // 元素的大小
 sendx    uint           // 下一次写下标的位置
 recvx    uint           // 下一次读下标的位置

 // 尝试读取channel或向channel写入数据而被阻塞的goroutine
 recvq    waitq  // 读等待队列
 sendq    waitq  // 写等待队列

 lock mutex //互斥锁,保证读写channel时不存在并发竞争问题
}

二)等待队列

双向链表,包含一个头结点和一个尾结点,每个节点是一个 sudog 结构体变量,记录哪个协程在等待,等待的是哪个 channel,等待发送/接收的数据在哪里

type waitq struct {
   first *sudog
   last  *sudog
}

type sudog struct {
	g *g
	next *sudog
	prev *sudog
	elem unsafe.Pointer
	c        *hchan
	...
}

三、操作

一)创建

1、创建 channel 有两种,一种是带缓冲的 channel,一种是不带缓冲的 channel

// 带缓冲
ch := make(chan int, 3)
// 不带缓冲
ch := make(chan int)

2、使用 make(chan T, cap) 来创建 channel,make 语法会在编译时,转换为 makechan64makechan

func makechan64(t *chantype, size int64) *hchan {
	if int64(int(size)) != size {
		panic(plainError("makechan: size out of range"))
	}

	return makechan(t, int(size))
}

3、创建时的策略

Channel 类型内存分配策略
无缓冲 Channel仅为hchan结构本身分配内存,因为无缓冲 Channel 不用于存储数据。
有缓冲 Channel,元素不包含指针hchan结构和底层数组分配一段连续的内存地址,提高内存访问效率。
有缓冲 Channel,元素包含指针hchan结构和底层数组分配不同的内存地址,以更好地处理含有指针的元素(垃圾回收)。

二)发送

1、发送操作

ch <- 10

2、发送操作,编译时转换为runtime.chansend函数

阻塞式:调用 chansend 函数,并且 block=true

非阻塞式:调用 chansend 函数,并且 block=false

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool

向 channel 中发送数据时大概分为两大块:检查和数据发送,数据发送流程如下:

如果 channel 的读等待队列存在接收者 goroutine,将数据直接发送给第一个等待的 goroutine, 唤醒接收的 goroutine

如果 channel 的读等待队列不存在接收者 goroutine

  • 如果循环数组 buf 未满,那么将会把数据发送到循环数组 buf 的队尾
  • 如果循环数组 buf 已满,这个时候就会走阻塞发送的流程,将当前 goroutine 加入写等待队列,并挂起等待唤醒

三)接收

1、接收操作

<ch

v := <ch

v, ok := <ch

2、接收操作,编译时转换为runtime.chanrecv函数

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)

阻塞式:调用 chanrecv 函数,并且 block=true

非阻塞式:调用 chanrecv 函数,并且 block=false

向 channel 中接收数据时大概分为两大块,检查和数据发送,而数据接收流程如下:

如果 channel 的写等待队列存在发送者 goroutine

  • 如果是无缓冲 channel,直接从第一个发送者 goroutine 那里把数据拷贝给接收变量,唤醒发送的 goroutine
  • 如果是有缓冲 channel(已满),将循环数组 buf 的队首元素拷贝给接收变量,将第一个发送者 goroutine 的数据拷贝到 buf 循环数组队尾,唤醒发送的 goroutine

如果 channel 的写等待队列不存在发送者 goroutine

  • 如果循环数组 buf 非空,将循环数组 buf 的队首元素拷贝给接收变量
  • 如果循环数组 buf 为空,这个时候就会走阻塞接收的流程,将当前 goroutine 加入读等待队列,并挂起等待唤醒

四)关闭

1、关闭操作

close(ch)

2、调用 close 函数,编译时转换为runtime.closechan函数

func closechan(c *hchan)

四、案例分析

package main

import (
	"fmt"
	"time"
	"unsafe"
)

func main() {
  // ch是长度为4的带缓冲的channel
  // 初始hchan结构体重的buf为空,sendx和recvx均为0
	ch := make(chan string, 4)
	fmt.Println(ch, unsafe.Sizeof(ch))
	go sendTask(ch)
	go receiveTask(ch)
	time.Sleep(1 * time.Second)
}

// G1是发送者
// 当G1向ch里发送数据时,首先会对buf加锁,然后将task存储的数据copy到buf中,然后sendx++,然后释放对buf的锁
func sendTask(ch chan string) {
	taskList := []string{"this", "is", "a", "demo"}
	for _, task := range taskList {
		ch <- task //发送任务到channel
	}
}

// G2是接收者
// 当G2消费ch的时候,会首先对buf加锁,然后将buf中的数据copy到task变量对应的内存里,然后recvx++,并释放锁
func receiveTask(ch chan string) {
	for {
		task := <-ch                  //接收任务
		fmt.Println("received", task) //处理任务
	}
}

总结 hchan 结构体的主要组成部分有四个:

  • 用来保存 goroutine 之间传递数据的循环数组:buf
  • 用来记录此循环数组当前发送或接收数据的下标值:sendx 和 recvx
  • 用于保存向该 chan 发送和从该 chan 接收数据被阻塞的 goroutine 队列: sendq 和 recvq
  • 保证 channel 写入和读取数据时线程安全的锁:lock

木川工作室 (微信:mcmc2024)